package com.flink.test;

import com.google.gson.Gson;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;

import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;

/**
 * @Description 向kafka发送模拟各省市的天气采集数据
 * @Author JL
 * @Date 2021/03/23
 * @Version V1.0
 */
public class WeatherToKafkaMsg {

    static String cityNames = "海门,鄂尔多斯,招远,舟山,齐齐哈尔,盐城,赤峰,青岛,乳山,金昌,泉州,莱西,日照,胶南,南通,拉萨," +
            "云浮,梅州,文登,上海,攀枝花,威海,承德,厦门,汕尾,潮州,丹东,太仓,曲靖,烟台,福州,瓦房店,即墨,抚顺,玉溪,张家口," +
            "阳泉,莱州,湖州,汕头,昆山,宁波,湛江,揭阳,荣成,连云港,葫芦岛,常熟,东莞,河源,淮安,泰州,南宁,营口,惠州,江阴,蓬莱," +
            "韶关,嘉峪关,广州,延安,太原,清远,中山,昆明,寿光,盘锦,长治,深圳,珠海,宿迁,咸阳,铜川,平度,佛山,海口,江门,章丘," +
            "肇庆,大连,临汾,吴江,石嘴山,沈阳,苏州,茂名,嘉兴,长春,胶州,银川,张家港,三门峡,锦州,南昌,柳州,三亚,自贡,吉林," +
            "阳江,泸州,西宁,宜宾,呼和浩特,成都,大同,镇江,桂林,张家界,宜兴,北海,西安,金坛,东营,牡丹江,遵义,绍兴,扬州,常州," +
            "潍坊,重庆,台州,南京,滨州,贵阳,无锡,本溪,克拉玛依,渭南,马鞍山,宝鸡,焦作,句容,北京,徐州,衡水,包头,绵阳,乌鲁木齐," +
            "枣庄,杭州,淄博,鞍山,溧阳,库尔勒,安阳,开封,济南,德阳,温州,九江,邯郸,临安,兰州,沧州,临沂,南充,天津,富阳,泰安," +
            "诸暨,郑州,哈尔滨,聊城,芜湖,唐山,平顶山,邢台,德州,济宁,荆州,宜昌,义乌,丽水,洛阳,秦皇岛,株洲,石家庄,莱芜,常德," +
            "保定,湘潭,金华,岳阳,长沙,衢州,廊坊,菏泽,合肥,武汉,大庆";

    /**
     * 主程入口
     * @param args
     */
    public static void main(String[] args) throws Exception {
        WeatherToKafkaMsg weatherToKafkaMsg = new WeatherToKafkaMsg();
        //生产者发送消息
        KafkaUtils.KafkaStreamServer kafkaStreamServer =  KafkaUtils.bulidServer().createKafkaStreamServer("192.168.110.35", 9092);
        String topic = "weather_behavior";
        String [] citys = StringUtils.split(cityNames, ",");
        while(true){
            String city  = citys[RandomUtils.nextInt(0, citys.length)] ;
            String site = String.format("%s-%s",city,RandomUtils.nextInt(0, 100));
            Integer air = RandomUtils.nextInt(10, 200);
            Double temperature = RandomUtils.nextDouble(0.0, 40.00);
            BigDecimal temperatureBig = new BigDecimal(temperature);
            temperature = temperatureBig.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
            Integer humidity = 0;
            Integer windLevel = 0;
            if (temperature<0){
                humidity = RandomUtils.nextInt(0, 5);
                windLevel = RandomUtils.nextInt(0, 6);
            }else if (temperature>0 && temperature<15){
                humidity = RandomUtils.nextInt(5, 25);
                windLevel = RandomUtils.nextInt(0, 8);
            } else {
                humidity = RandomUtils.nextInt(25, 75);
                windLevel = RandomUtils.nextInt(0, 10);
            }
            Long createTimeSeries = System.currentTimeMillis();
            //地市，站点，PM2.5，温度，湿度，风速，采集时间
            Weather weather = weatherToKafkaMsg.new Weather(city,site,air,temperature,humidity,windLevel,createTimeSeries);
            String weatherJson = new Gson().toJson(weather);
//            System.out.println(weatherJson);
            //向kafka队列发送数据
            kafkaStreamServer.sendMsg(topic, weatherJson);
            //线程休眠
            TimeUnit.MILLISECONDS.sleep(200);
        }
    }

    /**
     * 全国各省市》天气预报采集
     * 地市，站点，PM2.5，温度，湿度，风速，采集时间
     */
    class Weather{

        private String city;
        private String site;
        private Integer air;//pm2.5 = 0~250
        private Double temperature;
        private Integer humidity;
        private Integer windLevel;
        private Long createTimeSeries;

        public Weather(String city, String site, Integer air, Double temperature, Integer humidity, Integer windLevel, Long createTimeSeries) {
            this.city = city;
            this.site = site;
            this.air = air;
            this.temperature = temperature;
            this.humidity = humidity;
            this.windLevel = windLevel;
            this.createTimeSeries = createTimeSeries;
        }

        public String getCity() {
            return city;
        }

        public void setCity(String city) {
            this.city = city;
        }

        public String getSite() {
            return site;
        }

        public void setSite(String site) {
            this.site = site;
        }

        public Integer getAir() {
            return air;
        }

        public void setAir(Integer air) {
            this.air = air;
        }

        public Double getTemperature() {
            return temperature;
        }

        public void setTemperature(Double temperature) {
            this.temperature = temperature;
        }

        public Integer getHumidity() {
            return humidity;
        }

        public void setHumidity(Integer humidity) {
            this.humidity = humidity;
        }

        public Integer getWindLevel() {
            return windLevel;
        }

        public void setWindLevel(Integer windLevel) {
            this.windLevel = windLevel;
        }

        public Long getCreateTimeSeries() {
            return createTimeSeries;
        }

        public void setCreateTimeSeries(Long createTimeSeries) {
            this.createTimeSeries = createTimeSeries;
        }

        @Override
        public String toString() {
            return "Weather{" +
                    "city='" + city + '\'' +
                    ", site='" + site + '\'' +
                    ", air=" + air +
                    ", temperature=" + temperature +
                    ", humidity=" + humidity +
                    ", windSpeed=" + windLevel +
                    ", createTimeSeries=" + createTimeSeries +
                    '}';
        }
    }

}
